Skip to content

feat: allows hook in the subscriptions#1309

Merged
dkorittki merged 62 commits intomasterfrom
topic/streams-v1
Nov 21, 2025
Merged

feat: allows hook in the subscriptions#1309
dkorittki merged 62 commits intomasterfrom
topic/streams-v1

Conversation

@alepane21
Copy link
Copy Markdown
Contributor

@alepane21 alepane21 commented Oct 6, 2025

This PR introduces hooks inside the subscription lifecycle. We also decided to remove old pubsub implementation that is already deprecated and the router is not using anymore.

This change is primarily needed for Cosmo Streams support in the router. It allows to run a hook during the initialization of a subscription client. The changes are meant to be backwards compatible to the router, so the router can use an engine version with these changes without adjustments.

Summary by CodeRabbit

  • New Features

    • Added startup hooks for GraphQL subscriptions to run custom initialization when a subscription starts.
    • Improved subscription lifecycle with per-subscription update and close signaling, plus startup error propagation to clients.
  • Revert

    • Removed the Pub/Sub datasource implementation — all NATS and Kafka integrations, configurations, managers, and related tests were deleted.

✏️ Tip: You can customize this high-level summary in your review settings.

Checklist

  • I have discussed my proposed changes in an issue and have received approval to proceed.
  • I have followed the coding standards of the project.
  • Tests or benchmarks have been added or updated.

alepane21 and others added 30 commits July 18, 2025 18:56
…resolve at the initialization of each client subscription
…ce to fix when a subscription is completed while a hook is still sending messages)
…tarthandler' into ale/eng-7600-add-subscriptiononstarthandler
@coderabbitai summary

## Checklist

- [ ] I have discussed my proposed changes in an issue and have received
approval to proceed.
- [ ] I have followed the coding standards of the project.
- [ ] Tests or benchmarks have been added or updated.

I adjusted two tests which are failing during Githubs CI run:

```
--- FAIL: TestResolver_ResolveGraphQLSubscription (31.69s)
    --- FAIL: TestResolver_ResolveGraphQLSubscription/SubscriptionOnStart_ctx_updater_only_updates_the_right_subscription (0.01s)
        resolve_test.go:5604: 
            	Error Trace:	/home/runner/work/graphql-go-tools/graphql-go-tools/v2/pkg/engine/resolve/resolve_test.go:5604
            	Error:      	Should be true
            	Test:       	TestResolver_ResolveGraphQLSubscription/SubscriptionOnStart_ctx_updater_only_updates_the_right_subscription
    --- FAIL: TestResolver_ResolveGraphQLSubscription/SubscriptionOnStart_ctx_updater_on_multiple_subscriptions_with_same_trigger_works (0.01s)
        resolve_test.go:5674: 
            	Error Trace:	/home/runner/work/graphql-go-tools/graphql-go-tools/v2/pkg/engine/resolve/resolve_test.go:5674
            	Error:      	should not be here
            	Test:       	TestResolver_ResolveGraphQLSubscription/SubscriptionOnStart_ctx_updater_on_multiple_subscriptions_with_same_trigger_works
FAIL
```

These tests work locally, at least on my machine. I assumed it only
manifests itself on slower CPU´s like it's the case on Github workers.
After testing around with some time.Sleeps() here and there, I found
that if messages in tests are not emitted fast enough, then heartbeat
messages might find their way into the recorders message list. As these
tests have nothing to do with heartbeat testing I disabled them here. I
am not 100% sure this is the problem, but I am fairly certain and I
think it makes sense anyway.
@coderabbitai summary

## Checklist

- [x] I have discussed my proposed changes in an issue and have received
approval to proceed.
- [x] I have followed the coding standards of the project.
- [x] Tests or benchmarks have been added or updated.

# Context

Some notes first:
- Its only happening for tests introduced on the cosmo streams topic
branch
- It seems to be a race condition in tests rather than actual engine
code

I spotted two tests failing on Github Actions due to race conditions.
They work locally and are CPU timings related.

Those two tests are
- test 1 `SubscriptionOnStart ctx updater only updates the right
subscription`
- test 2 `SubscriptionOnStart ctx updater on multiple subscriptions with
same trigger works`

### test 1:
There is a race condition going on. Here is the output of the test on
Github runners with engine logs enabled.
```
resolver:trigger:subscription:add:17241709254077376921:1
resolver:create:trigger:17241709254077376921
resolver:trigger:start:17241709254077376921
resolver:subscription_updater:update:17241709254077376921
resolver:trigger:initialized:17241709254077376921
resolver:subscription_updater:update:17241709254077376921
resolver:trigger:subscription:update:17241709254077376921:1,1
resolver:trigger:update:17241709254077376921
resolver:trigger:subscription:add:17241709254077376921:2
resolver:trigger:subscription:added:17241709254077376921:2
resolver:trigger:subscription:update:1
resolver:trigger:subscription:flushed:1
resolver:trigger:subscription:update:1
resolver:trigger:subscription:flushed:1
resolver:trigger:started:17241709254077376921
resolver:subscription_updater:complete:17241709254077376921
resolver:subscription_updater:complete:sent_event:17241709254077376921
resolver:trigger:complete:17241709254077376921
resolver:trigger:complete:17241709254077376921
resolver:trigger:subscription:closed:17241709254077376921:1
resolver:trigger:subscription:closed:17241709254077376921:2

recorder 1 messages: [{"data":{"counter":1000}} {"data":{"counter":0}}]
recorder 2 messages: []
```

As you can see recorder 2 misses its one expected message. The reason is
that we update the trigger with the counter=0 message (line 8) before
the second subscriber is added (line 9). So it misses the message. This
happens because in the test we don't wait for the subscriber to finish
registration on the trigger before sending the counter=0 message. Now we
actually wait for that.

### test 2:
Kind of the same error. Here is the engine debug output from a failing
Github Actions run:

```
resolver:trigger:subscription:add:15889878720417707388:1
resolver:create:trigger:15889878720417707388
resolver:trigger:start:15889878720417707388
resolver:subscription_updater:update:15889878720417707388
resolver:trigger:initialized:15889878720417707388
resolver:subscription_updater:update:15889878720417707388
resolver:trigger:subscription:update:15889878720417707388:1,1
resolver:trigger:update:15889878720417707388
resolver:trigger:subscription:add:15889878720417707388:2
resolver:trigger:subscription:added:15889878720417707388:2
resolver:subscription_updater:update:15889878720417707388
resolver:trigger:subscription:update:15889878720417707388:1,2
resolver:trigger:subscription:update:2
resolver:trigger:started:15889878720417707388
resolver:trigger:subscription:update:1
resolver:trigger:subscription:flushed:2
resolver:trigger:subscription:flushed:1
resolver:trigger:subscription:update:1
resolver:trigger:subscription:flushed:1
resolver:subscription_updater:complete:15889878720417707388
resolver:subscription_updater:complete:sent_event:15889878720417707388
resolver:trigger:complete:15889878720417707388
resolver:trigger:complete:15889878720417707388
resolver:trigger:subscription:closed:15889878720417707388:1
resolver:trigger:subscription:closed:15889878720417707388:2

recorder 1 messages: [{"data":{"counter":1000}} {"data":{"counter":0}}]
recorder 2 messages: [{"data":{"counter":1000}}]
```

As you can see recorder 2 misses the counter=0 message. Both are
expected to have the same messages in the same order. Both recorders
have the counter=1000 message, which is delivered via
subscription-on-start hook but recorder 2 misses the counter=0 message,
delivered via fake stream. The count=0 message is delivered (line 8)
before recorder 2 is subscribed (line 9). This happens because in this
test, like in the other, we don't wait for the recorders to finish
subscribing to the trigger, and sending off the counter=0 messages via
fake stream early. Its fixed by waiting for a complete subscription.
@dkorittki dkorittki requested a review from jensneuse November 10, 2025 14:36
@dkorittki
Copy link
Copy Markdown
Contributor

@coderabbitai review

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Nov 10, 2025

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (3)
v2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go (3)

8274-8286: Interface shims OK; add compile-time assertion

No-op methods correctly satisfy the expanded resolve.SubscriptionUpdater. Suggest adding a compile-time interface check to avoid future drift.

 type testSubscriptionUpdaterChan struct {
   updates  chan string
   complete chan struct{}
   closed   chan resolve.SubscriptionCloseKind
 }
 
+// ensure interface stays satisfied
+var _ resolve.SubscriptionUpdater = (*testSubscriptionUpdaterChan)(nil)

8412-8424: Mirror compile-time check for the second test updater

Same suggestion as above to guard against interface changes.

 type testSubscriptionUpdater struct {
   updates []string
   done    bool
   closed  bool
   mux     sync.Mutex
 }
 
+// ensure interface stays satisfied
+var _ resolve.SubscriptionUpdater = (*testSubscriptionUpdater)(nil)

8998-9051: Good coverage of SubscriptionOnStart; consider two extra cases

Solid tests for invocation and error propagation. Consider:

  • An integration test asserting Start() invokes SubscriptionOnStart and propagates hook errors (e.g., returns resolve.ErrUnableToResolve if that’s the contract).
  • A hook that calls ctx.Updater(...) to verify startup hooks can emit an initial event. Based on learnings.

If helpful, I can draft a small test that fails if SubscriptionOnStart isn’t invoked from Start().

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ecbac93 and 067b413.

📒 Files selected for processing (12)
  • v2/pkg/engine/datasource/graphql_datasource/configuration.go (3 hunks)
  • v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go (3 hunks)
  • v2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go (4 hunks)
  • v2/pkg/engine/datasource/pubsub_datasource/kafka_event_manager.go (0 hunks)
  • v2/pkg/engine/datasource/pubsub_datasource/nats_event_manager.go (0 hunks)
  • v2/pkg/engine/datasource/pubsub_datasource/pubsub_datasource.go (0 hunks)
  • v2/pkg/engine/datasource/pubsub_datasource/pubsub_datasource_test.go (0 hunks)
  • v2/pkg/engine/datasource/pubsub_datasource/pubsub_kafka.go (0 hunks)
  • v2/pkg/engine/datasource/pubsub_datasource/pubsub_nats.go (0 hunks)
  • v2/pkg/engine/resolve/datasource.go (1 hunks)
  • v2/pkg/engine/resolve/resolve.go (11 hunks)
  • v2/pkg/engine/resolve/resolve_test.go (20 hunks)
💤 Files with no reviewable changes (6)
  • v2/pkg/engine/datasource/pubsub_datasource/kafka_event_manager.go
  • v2/pkg/engine/datasource/pubsub_datasource/nats_event_manager.go
  • v2/pkg/engine/datasource/pubsub_datasource/pubsub_datasource_test.go
  • v2/pkg/engine/datasource/pubsub_datasource/pubsub_datasource.go
  • v2/pkg/engine/datasource/pubsub_datasource/pubsub_kafka.go
  • v2/pkg/engine/datasource/pubsub_datasource/pubsub_nats.go
🧰 Additional context used
🧠 Learnings (5)
📓 Common learnings
Learnt from: endigma
Repo: wundergraph/graphql-go-tools PR: 1298
File: v2/pkg/engine/resolve/resolve.go:339-347
Timestamp: 2025-09-19T14:51:33.724Z
Learning: In the graphql-go-tools resolver, the event loop follows a strict single-threaded design where all subscription lifecycle management (add/remove from triggers) must happen through the events channel to the main processEvents() goroutine. Worker goroutines should not directly modify subscription state or call unsubscribe methods, as this would break the single-threaded event loop architecture and create race conditions.
📚 Learning: 2025-09-19T14:51:33.724Z
Learnt from: endigma
Repo: wundergraph/graphql-go-tools PR: 1298
File: v2/pkg/engine/resolve/resolve.go:339-347
Timestamp: 2025-09-19T14:51:33.724Z
Learning: In the graphql-go-tools resolver, the event loop follows a strict single-threaded design where all subscription lifecycle management (add/remove from triggers) must happen through the events channel to the main processEvents() goroutine. Worker goroutines should not directly modify subscription state or call unsubscribe methods, as this would break the single-threaded event loop architecture and create race conditions.

Applied to files:

  • v2/pkg/engine/resolve/resolve.go
  • v2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go
  • v2/pkg/engine/resolve/resolve_test.go
📚 Learning: 2025-09-19T14:50:19.528Z
Learnt from: endigma
Repo: wundergraph/graphql-go-tools PR: 1298
File: v2/pkg/engine/resolve/resolve.go:999-1011
Timestamp: 2025-09-19T14:50:19.528Z
Learning: In the graphql-go-tools resolver, when handling backpressure for async unsubscribe operations, the team prefers using individual goroutines over bounded deferrer implementations. The rationale is that bounded implementations can also overflow, and the preferred solution for overload is scaling the deployment rather than internal buffering. The current approach prevents deadlocks at the cost of goroutine creation, which is an acceptable tradeoff.

Applied to files:

  • v2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go
  • v2/pkg/engine/resolve/resolve_test.go
📚 Learning: 2025-08-29T09:35:47.969Z
Learnt from: ysmolski
Repo: wundergraph/graphql-go-tools PR: 1282
File: v2/pkg/engine/plan/visitor.go:5-5
Timestamp: 2025-08-29T09:35:47.969Z
Learning: The wundergraph/graphql-go-tools project does not support Go versions < 1.23, so compatibility concerns for features available in Go 1.21+ (like cmp.Or) should not be raised.

Applied to files:

  • v2/pkg/engine/datasource/graphql_datasource/configuration.go
📚 Learning: 2025-10-16T13:05:19.838Z
Learnt from: Noroth
Repo: wundergraph/graphql-go-tools PR: 1323
File: v2/pkg/engine/datasource/grpc_datasource/compiler.go:683-702
Timestamp: 2025-10-16T13:05:19.838Z
Learning: In GraphQL field resolver context resolution (v2/pkg/engine/datasource/grpc_datasource/compiler.go), when traversing paths in resolveContextDataForPath, the code can safely assume that intermediate path segments will only be messages or lists, never scalars. This is because field resolvers are only defined on GraphQL object types, not scalar types, so the parent function must return either a message or a list. This invariant is enforced by the GraphQL type system design.

Applied to files:

  • v2/pkg/engine/datasource/graphql_datasource/configuration.go
🔇 Additional comments (2)
v2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go (1)

4009-4010: LGTM: test wiring matches new client field

Using SubscriptionSource.client here reflects the new startup-hook wiring path and keeps tests realistic.

v2/pkg/engine/resolve/resolve_test.go (1)

797-824: Good defensive programming with nil check

The implementation properly checks for nil before invoking the subscriptionOnStartFn callback, which maintains backward compatibility and prevents panics when the hook is not provided.

Comment thread v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go Outdated
Comment thread v2/pkg/engine/resolve/resolve_test.go
Comment thread v2/pkg/engine/resolve/resolve_test.go
@dkorittki dkorittki marked this pull request as ready for review November 10, 2025 15:26
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (2)
v2/pkg/engine/resolve/resolve_test.go (2)

4797-4803: Startup hook plumbing in test fake stream: good; consider options for ergonomics

Adding subscriptionOnStartFn to createFakeStream is correct and keeps tests focused. To reduce widespread nils and improve readability, consider an options struct (e.g., fakeStreamOpts{messageFn, delay, onStart, onStartHook}) instead of multiple positional params.


5561-5564: Make UniqueRequestID overrides write to the digest for deterministic grouping

Returning nil without writing to xxh relies on an all-zero digest. Write a constant to ensure stable “same trigger” semantics and future-proof against internal changes.

Apply this diff:

-        fakeStream.uniqueRequestFn = func(ctx *Context, input []byte, xxh *xxhash.Digest) (err error) {
-            return nil
-        }
+        fakeStream.uniqueRequestFn = func(ctx *Context, input []byte, xxh *xxhash.Digest) error {
+            if _, err := xxh.WriteString("same-trigger"); err != nil {
+                return err
+            }
+            _, err := xxh.Write(input)
+            return err
+        }
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 067b413 and 423a60c.

📒 Files selected for processing (2)
  • v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go (3 hunks)
  • v2/pkg/engine/resolve/resolve_test.go (20 hunks)
🧰 Additional context used
🧠 Learnings (7)
📓 Common learnings
Learnt from: endigma
Repo: wundergraph/graphql-go-tools PR: 1298
File: v2/pkg/engine/resolve/resolve.go:339-347
Timestamp: 2025-09-19T14:51:33.724Z
Learning: In the graphql-go-tools resolver, the event loop follows a strict single-threaded design where all subscription lifecycle management (add/remove from triggers) must happen through the events channel to the main processEvents() goroutine. Worker goroutines should not directly modify subscription state or call unsubscribe methods, as this would break the single-threaded event loop architecture and create race conditions.
📚 Learning: 2025-09-19T14:51:33.724Z
Learnt from: endigma
Repo: wundergraph/graphql-go-tools PR: 1298
File: v2/pkg/engine/resolve/resolve.go:339-347
Timestamp: 2025-09-19T14:51:33.724Z
Learning: In the graphql-go-tools resolver, the event loop follows a strict single-threaded design where all subscription lifecycle management (add/remove from triggers) must happen through the events channel to the main processEvents() goroutine. Worker goroutines should not directly modify subscription state or call unsubscribe methods, as this would break the single-threaded event loop architecture and create race conditions.

Applied to files:

  • v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go
  • v2/pkg/engine/resolve/resolve_test.go
📚 Learning: 2025-09-19T14:50:19.528Z
Learnt from: endigma
Repo: wundergraph/graphql-go-tools PR: 1298
File: v2/pkg/engine/resolve/resolve.go:999-1011
Timestamp: 2025-09-19T14:50:19.528Z
Learning: In the graphql-go-tools resolver, when handling backpressure for async unsubscribe operations, the team prefers using individual goroutines over bounded deferrer implementations. The rationale is that bounded implementations can also overflow, and the preferred solution for overload is scaling the deployment rather than internal buffering. The current approach prevents deadlocks at the cost of goroutine creation, which is an acceptable tradeoff.

Applied to files:

  • v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go
  • v2/pkg/engine/resolve/resolve_test.go
📚 Learning: 2025-10-16T13:05:19.838Z
Learnt from: Noroth
Repo: wundergraph/graphql-go-tools PR: 1323
File: v2/pkg/engine/datasource/grpc_datasource/compiler.go:683-702
Timestamp: 2025-10-16T13:05:19.838Z
Learning: In GraphQL field resolver context resolution (v2/pkg/engine/datasource/grpc_datasource/compiler.go), when traversing paths in resolveContextDataForPath, the code can safely assume that intermediate path segments will only be messages or lists, never scalars. This is because field resolvers are only defined on GraphQL object types, not scalar types, so the parent function must return either a message or a list. This invariant is enforced by the GraphQL type system design.

Applied to files:

  • v2/pkg/engine/resolve/resolve_test.go
📚 Learning: 2025-10-15T13:34:15.892Z
Learnt from: ysmolski
Repo: wundergraph/graphql-go-tools PR: 1322
File: v2/pkg/astvalidation/operation_rule_defer_stream_on_root_fields.go:92-127
Timestamp: 2025-10-15T13:34:15.892Z
Learning: In the graphql-go-tools repository, validation for defer and stream directives runs after normalization, which performs fragment inlining. Therefore, fragment spreads don't exist in the AST when these validation rules execute—they're already expanded into inline fragments or fields.

Applied to files:

  • v2/pkg/engine/resolve/resolve_test.go
📚 Learning: 2025-08-08T09:43:07.433Z
Learnt from: Noroth
Repo: wundergraph/graphql-go-tools PR: 1262
File: v2/pkg/engine/datasource/grpc_datasource/json_builder.go:0-0
Timestamp: 2025-08-08T09:43:07.433Z
Learning: In v2/pkg/engine/datasource/grpc_datasource/json_builder.go, mergeEntities intentionally uses the loop index when calling indexMap.getResultIndex because the index map is type-aware, making per-type counters unnecessary under the current assumptions. Avoid suggesting per-type ordinal counters for this path in future reviews.

Applied to files:

  • v2/pkg/engine/resolve/resolve_test.go
📚 Learning: 2025-07-02T15:28:02.122Z
Learnt from: SkArchon
Repo: wundergraph/graphql-go-tools PR: 1203
File: v2/pkg/engine/resolve/loader.go:63-67
Timestamp: 2025-07-02T15:28:02.122Z
Learning: In the graphql-go-tools codebase, result structs are consistently initialized with non-nil bytes.Buffer instances, making additional nil checks for res.out unnecessary defensive programming.

Applied to files:

  • v2/pkg/engine/resolve/resolve_test.go
🧬 Code graph analysis (2)
v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go (2)
v2/pkg/engine/datasource/graphql_datasource/configuration.go (1)
  • SubscriptionOnStartFn (110-110)
v2/pkg/engine/resolve/resolve.go (1)
  • StartupHookContext (595-598)
v2/pkg/engine/resolve/resolve_test.go (2)
v2/pkg/engine/resolve/resolve.go (2)
  • StartupHookContext (595-598)
  • SubscriptionIdentifier (1017-1020)
v2/pkg/engine/resolve/context.go (2)
  • Context (16-35)
  • ExecutionOptions (37-41)
🔇 Additional comments (5)
v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go (3)

452-453: LGTM!

The subscription startup hooks are correctly wired from the configuration into the SubscriptionSource during subscription setup.


1956-1959: LGTM!

The struct fields appropriately store the subscription client and startup hook functions for later invocation.


2011-2020: LGTM!

The implementation correctly executes all registered startup hooks sequentially, returning early only if a hook returns an error. This matches the fix confirmed in the past review comments.

v2/pkg/engine/resolve/resolve_test.go (2)

4819-4824: No-op safe SubscriptionOnStart

Nil-guard with delegation is correct and keeps fake compliant with the new interface. LGTM.


5872-5901: Error propagation from SubscriptionOnStart hook is validated

Good addition. Confirms hook errors surface as GraphQL errors without failing the resolver call.

Comment thread v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go Outdated
Comment thread v2/pkg/engine/resolve/resolve_test.go
The hookCompleted channel was never read from, only written to.
The test also blocked when writing the second time to that channel, until test timeout, when run in isolation.
It mistankingly was displayed as successfully finished when run via
go test ./... because the timeout there is different and then the internal timeout of the test was faster and mistankingly marked the test as successfully finished.

This got fixed by using a waitgroup, which is a better fit here. It prevents blocking the routines, and we now actually wait for the routines to finish or we fail.

Also the heartbeat message is not "{}" but "heartbeat". This got fixed as well
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
v2/pkg/engine/resolve/resolve_test.go (2)

5493-5522: Consider decoupling this test from heartbeat behavior

The test correctly validates that the startup hook’s updater can inject a message before the first upstream event, but it also assumes there will be exactly two messages total. If heartbeat behavior changes in the future, this could become brittle even though the core contract still holds.

You could optionally filter out "heartbeat" entries before asserting length and order, e.g.:

-   recorder.AwaitComplete(t, defaultTimeout)
-   assert.Equal(t, 2, len(recorder.Messages()))
-   assert.Equal(t, `{"data":{"counter":1000}}`, recorder.Messages()[0])
-   assert.Equal(t, `{"data":{"counter":0}}`, recorder.Messages()[1])
+   recorder.AwaitComplete(t, defaultTimeout)
+   var dataMsgs []string
+   for _, m := range recorder.Messages() {
+       if m != "heartbeat" {
+           dataMsgs = append(dataMsgs, m)
+       }
+   }
+   assert.Equal(t, 2, len(dataMsgs))
+   assert.Equal(t, `{"data":{"counter":1000}}`, dataMsgs[0])
+   assert.Equal(t, `{"data":{"counter":0}}`, dataMsgs[1])

5524-5632: Per-subscription updater scoping test is correct; one small readability tweak

This test is a good, race-safe way to verify that, even when two subscriptions share the same trigger/stream, only the intended subscription receives the startup-hook update (using uniqueRequestFn and an atomic guard). This fits well with the single-threaded subscription event-loop model. Based on learnings.

You can optionally drop the extra startupHookWaitGroup.Wait() after the select since the goroutine that closes done is already waiting on the same WaitGroup:

-   go func() {
-       startupHookWaitGroup.Wait()
-       close(done)
-   }()
-   select {
-   case <-done:
-   case <-time.After(defaultTimeout):
-       t.Fatal("timed out waiting for subscription startup hooks")
-   }
-
-   // Wait for both subscriptions startup hooks to be executed
-   startupHookWaitGroup.Wait()
+   go func() {
+       startupHookWaitGroup.Wait()
+       close(done)
+   }()
+   select {
+   case <-done:
+   case <-time.After(defaultTimeout):
+       t.Fatal("timed out waiting for subscription startup hooks")
+   }
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 423a60c and a4f1891.

📒 Files selected for processing (2)
  • v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go (3 hunks)
  • v2/pkg/engine/resolve/resolve_test.go (20 hunks)
🧰 Additional context used
🧠 Learnings (9)
📓 Common learnings
Learnt from: endigma
Repo: wundergraph/graphql-go-tools PR: 1298
File: v2/pkg/engine/resolve/resolve.go:339-347
Timestamp: 2025-09-19T14:51:33.724Z
Learning: In the graphql-go-tools resolver, the event loop follows a strict single-threaded design where all subscription lifecycle management (add/remove from triggers) must happen through the events channel to the main processEvents() goroutine. Worker goroutines should not directly modify subscription state or call unsubscribe methods, as this would break the single-threaded event loop architecture and create race conditions.
📚 Learning: 2025-09-19T14:51:33.724Z
Learnt from: endigma
Repo: wundergraph/graphql-go-tools PR: 1298
File: v2/pkg/engine/resolve/resolve.go:339-347
Timestamp: 2025-09-19T14:51:33.724Z
Learning: In the graphql-go-tools resolver, the event loop follows a strict single-threaded design where all subscription lifecycle management (add/remove from triggers) must happen through the events channel to the main processEvents() goroutine. Worker goroutines should not directly modify subscription state or call unsubscribe methods, as this would break the single-threaded event loop architecture and create race conditions.

Applied to files:

  • v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go
  • v2/pkg/engine/resolve/resolve_test.go
📚 Learning: 2025-09-19T14:50:19.528Z
Learnt from: endigma
Repo: wundergraph/graphql-go-tools PR: 1298
File: v2/pkg/engine/resolve/resolve.go:999-1011
Timestamp: 2025-09-19T14:50:19.528Z
Learning: In the graphql-go-tools resolver, when handling backpressure for async unsubscribe operations, the team prefers using individual goroutines over bounded deferrer implementations. The rationale is that bounded implementations can also overflow, and the preferred solution for overload is scaling the deployment rather than internal buffering. The current approach prevents deadlocks at the cost of goroutine creation, which is an acceptable tradeoff.

Applied to files:

  • v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go
  • v2/pkg/engine/resolve/resolve_test.go
📚 Learning: 2025-11-19T09:42:17.644Z
Learnt from: Noroth
Repo: wundergraph/graphql-go-tools PR: 1341
File: v2/pkg/engine/datasource/grpc_datasource/execution_plan_visitor_federation.go:406-429
Timestamp: 2025-11-19T09:42:17.644Z
Learning: In the wundergraph/graphql-go-tools gRPC datasource implementation (v2/pkg/engine/datasource/grpc_datasource), field resolvers must have arguments. The system does not currently support defining field resolvers without arguments. This invariant ensures that the `parentCallID` increment in `enterFieldResolver` is always matched by a decrement in `LeaveField` (which checks `r.operation.FieldHasArguments(ref)`).

Applied to files:

  • v2/pkg/engine/resolve/resolve_test.go
📚 Learning: 2025-10-16T13:05:19.838Z
Learnt from: Noroth
Repo: wundergraph/graphql-go-tools PR: 1323
File: v2/pkg/engine/datasource/grpc_datasource/compiler.go:683-702
Timestamp: 2025-10-16T13:05:19.838Z
Learning: In GraphQL field resolver context resolution (v2/pkg/engine/datasource/grpc_datasource/compiler.go), when traversing paths in resolveContextDataForPath, the code can safely assume that intermediate path segments will only be messages or lists, never scalars. This is because field resolvers are only defined on GraphQL object types, not scalar types, so the parent function must return either a message or a list. This invariant is enforced by the GraphQL type system design.

Applied to files:

  • v2/pkg/engine/resolve/resolve_test.go
📚 Learning: 2025-11-19T10:53:06.342Z
Learnt from: Noroth
Repo: wundergraph/graphql-go-tools PR: 1341
File: v2/pkg/engine/datasource/grpc_datasource/execution_plan.go:1039-1097
Timestamp: 2025-11-19T10:53:06.342Z
Learning: In v2/pkg/engine/datasource/grpc_datasource field resolver response handling, the `resolveRequiredFields` function intentionally uses two distinct approaches: for simple GraphQL object types it populates `message.Fields`, while for composite types (interface/union) it exclusively uses `message.FieldSelectionSet` with fragment-based selections. This differs from `buildFieldMessage` (regular queries) because field resolver responses returning composite types must align with protobuf oneOf structure, where all selections—including common interface fields—are handled through fragment selections built by `buildCompositeField`. The two approaches cannot be mixed in field resolver responses.

Applied to files:

  • v2/pkg/engine/resolve/resolve_test.go
📚 Learning: 2025-10-15T13:34:15.892Z
Learnt from: ysmolski
Repo: wundergraph/graphql-go-tools PR: 1322
File: v2/pkg/astvalidation/operation_rule_defer_stream_on_root_fields.go:92-127
Timestamp: 2025-10-15T13:34:15.892Z
Learning: In the graphql-go-tools repository, validation for defer and stream directives runs after normalization, which performs fragment inlining. Therefore, fragment spreads don't exist in the AST when these validation rules execute—they're already expanded into inline fragments or fields.

Applied to files:

  • v2/pkg/engine/resolve/resolve_test.go
📚 Learning: 2025-08-08T09:43:07.433Z
Learnt from: Noroth
Repo: wundergraph/graphql-go-tools PR: 1262
File: v2/pkg/engine/datasource/grpc_datasource/json_builder.go:0-0
Timestamp: 2025-08-08T09:43:07.433Z
Learning: In v2/pkg/engine/datasource/grpc_datasource/json_builder.go, mergeEntities intentionally uses the loop index when calling indexMap.getResultIndex because the index map is type-aware, making per-type counters unnecessary under the current assumptions. Avoid suggesting per-type ordinal counters for this path in future reviews.

Applied to files:

  • v2/pkg/engine/resolve/resolve_test.go
📚 Learning: 2025-07-02T15:28:02.122Z
Learnt from: SkArchon
Repo: wundergraph/graphql-go-tools PR: 1203
File: v2/pkg/engine/resolve/loader.go:63-67
Timestamp: 2025-07-02T15:28:02.122Z
Learning: In the graphql-go-tools codebase, result structs are consistently initialized with non-nil bytes.Buffer instances, making additional nil checks for res.out unnecessary defensive programming.

Applied to files:

  • v2/pkg/engine/resolve/resolve_test.go
🧬 Code graph analysis (2)
v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go (2)
v2/pkg/engine/datasource/graphql_datasource/configuration.go (1)
  • SubscriptionOnStartFn (110-110)
v2/pkg/engine/resolve/resolve.go (1)
  • StartupHookContext (595-598)
v2/pkg/engine/resolve/resolve_test.go (2)
v2/pkg/engine/resolve/resolve.go (2)
  • StartupHookContext (595-598)
  • SubscriptionIdentifier (1017-1020)
v2/pkg/engine/resolve/context.go (2)
  • Context (16-35)
  • ExecutionOptions (37-41)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: Build and test (go 1.25 / windows-latest)
  • GitHub Check: Build and test (go 1.25 / ubuntu-latest)
  • GitHub Check: Build and test (go 1.25 / ubuntu-latest)
🔇 Additional comments (12)
v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go (3)

449-454: Startup hooks correctly wired into SubscriptionSource

Passing p.config.subscription.StartupHooks into SubscriptionSource alongside the existing subscriptionClient cleanly exposes startup hooks to the data source without altering existing behavior. Looks good.


1957-1959: SubscriptionSource state extended safely with startup hooks

Adding subscriptionOnStartFns []SubscriptionOnStartFn to SubscriptionSource keeps hook configuration immutable after construction and maintains a simple, concurrency-safe shape for the data source.


2009-2020: Sequential startup hook execution with proper error short‑circuiting

SubscriptionOnStart now iterates all configured hooks, returning early on the first non‑nil error and otherwise succeeding. This matches the comment, propagates failures correctly to the caller, and keeps hooks running synchronously on the same goroutine, which fits the single‑threaded subscription event loop expectations for StartupHookContext. Based on learnings.

v2/pkg/engine/resolve/resolve_test.go (9)

4797-4851: Fake stream helper correctly extended for startup hooks and custom request IDs

createFakeStream, _fakeStream, and UniqueRequestID cleanly support the new SubscriptionOnStart hook and overrideable request ID while preserving existing behavior when the new fields are nil. This keeps the test double aligned with the production subscription datasource interface without introducing extra complexity.


5084-5457: Call sites correctly updated for new createFakeStream signature

All subscription tests that don’t exercise startup hooks now pass nil for the new subscriptionOnStartFn parameter. This keeps them behaviorally identical while conforming to the updated helper API.


5976-6380: Filter subscription tests remain behaviorally unchanged after helper update

The filter-focused subscription tests also consistently pass nil for subscriptionOnStartFn, so they continue to validate filtering semantics only, without being coupled to the new hook mechanism.


5459-5491: Test for basic SubscriptionOnStart invocation is sound

The "should call SubscriptionOnStart hook" test uses a buffered channel to detect the hook call without blocking and validates the trigger input, giving a clear, deterministic signal that the hook is wired correctly.


5634-5712: Multi-subscriber startup hook fan-out is well covered

The "SubscriptionOnStart ctx updater on multiple subscriptions with same trigger works" test robustly validates that each subscription sharing a trigger receives its own startup-hook update followed by the upstream message, which matches the intended per-subscription updater semantics.


5714-5748: High-volume startup-hook updater stress test aligns with backpressure strategy

The stress test that sends workChanBufferSize+1 updates from the startup hook before the single upstream message is a good fit for the goroutine-based backpressure approach and helps guard against deadlocks in the updater pipeline. Based on learnings.


5750-5830: Concurrent hook and upstream updates are exercised without introducing deadlocks

The coordination via firstMessageArrived and completionWg cleanly validates that a busy SubscriptionOnStart hook (including a goroutine that waits for upstream to start) can emit many updates concurrently with the stream without blocking or deadlocking, while correctly accounting for possible heartbeats.


5832-5881: Shared-trigger multi-subscription behavior is correctly validated

The test ensuring two subscriptions share the same trigger and both receive the full 0–100 counter sequence (with independent recorders) gives solid confidence in the trigger fan-out and completion logic for shared streams.


5882-5911: Error propagation from SubscriptionOnStart hook is tested in the right place

The test correctly checks that errors returned by the SubscriptionOnStart hook are rendered as GraphQL error responses via the async error writer rather than being returned directly from AsyncResolveGraphQLSubscription, and it asserts that the sentinel error text is present in the payload.

Copy link
Copy Markdown
Collaborator

@StarpTech StarpTech left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@dkorittki dkorittki merged commit 79d7e79 into master Nov 21, 2025
11 checks passed
@dkorittki dkorittki deleted the topic/streams-v1 branch November 21, 2025 21:06
StarpTech added a commit that referenced this pull request Nov 21, 2025
🤖 I have created a release *beep* *boop*
---


##
[2.0.0-rc.239](v2.0.0-rc.238...v2.0.0-rc.239)
(2025-11-21)


### Features

* allows hook in the subscriptions
([#1309](#1309))
([79d7e79](79d7e79))


### Bug Fixes

* pin the correct version of sjson in v2
([#1334](#1334))
([0009898](0009898))

---
This PR was generated with [Release
Please](https://github.com/googleapis/release-please). See
[documentation](https://github.com/googleapis/release-please#release-please).

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

* **New Features**
* Added subscription hooks functionality to provide more flexible
subscription management options and enhanced control over subscription
workflows.

* **Bug Fixes**
* Fixed and pinned the correct dependency version to prevent potential
compatibility issues and ensure stable operation across environments.

<sub>✏️ Tip: You can customize this high-level summary in your review
settings.</sub>

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants